/*
 *  Copyright (C) 2004 Cidero, Inc.
 *
 *  Permission is hereby granted to any person obtaining a copy of 
 *  this software to use, copy, modify, merge, publish, and distribute
 *  the software for any non-commercial purpose, subject to the
 *  following conditions:
 *  
 *  The above copyright notice and this permission notice shall be included
 *  in all copies or substantial portions of the Software.
 *
 *  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS 
 *  OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 *  FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL 
 *  THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 *  LIABILITY IN CONNECTION WITH THE SOFTWARE.
 * 
 *  File: $RCSfile: SyncGroup.java,v $
 *
 */

package com.cidero.proxy;

import java.io.BufferedOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Vector;
import java.util.ArrayList;
import java.util.logging.Logger;

import com.cidero.http.*;
import com.cidero.util.ByteBufferQueue;
import com.cidero.util.MrUtil;
import com.cidero.util.AppPreferences;
import com.cidero.server.roku.*;

/**
 *
 * Synchronous  group class. Outputs streaming data to a group of one or more 
 * clients in a (hopefully) synchronized manner.
 *
 * Synchronization Strategy
 *
 * When two or more clients of the same device type make a request of 
 * the server within ~2 seconds of one another, they are put in the 
 * same group, and the server begins serving data to them at the same 
 * time. If a round-robin scheme with fairly small data chunks is used, this
 * seems to work acceptably (sometimes the sync is less perfect than others,
 * and it is probably dependent on the device type as well).
 *
 * Note that this strategy doesn't work for devices of different types, or
 * perhaps not even devices with different firmware revisions.  It's all 
 * dependent on the devices having the same internal buffering & latency.
 *
 * Also, even for devices of the same type, there will be inter-device
 * drift due to small differences in the clocking of the device's DAC.
 * In a test of two particular Roku Soundbridges, for example, the sync
 * starts off well, then after 2 hours or so of listening to an Internet
 * radio station one can notice an echo effect.  The stream has to be
 * stopped and restarted to correct for this ('slipping' the streams to
 * compensate tends to introduce audio glitches which are annoying).
 * 
 */
public class SyncGroup
{
  private static Logger logger = Logger.getLogger("com.cidero.control");

  HTTPProxySession parentSession;

  // Use vector for stream list since it is synchronized
  Vector  streamInfoList = new Vector();   
  boolean running = false;

  class StreamInfo
  {
    BufferedOutputStream stream;
    HTTPConnection connection;

    public StreamInfo( BufferedOutputStream stream,
                       HTTPConnection connection )
    {
      this.stream = stream;
      this.connection = connection;
    }

    public BufferedOutputStream getStream() {
      return stream;
    }

    public HTTPConnection getConnection() {
      return connection;
    }
  }
  
  public SyncGroup( HTTPProxySession parentSession,
                    BufferedOutputStream stream,
                    HTTPConnection connection )
  {
    this.parentSession = parentSession;

    StreamInfo streamInfo = new StreamInfo( stream, connection );
    String reqHost = connection.getRemoteAddress();
    
    streamInfoList.add( streamInfo );
  }
  
  /*
   *  Add a new stream to the synchronzed group. This function may be
   *  invoked by other threads
   */
  public synchronized void addStream( BufferedOutputStream stream,
                                      HTTPConnection connection )
  {
    StreamInfo streamInfo = new StreamInfo( stream, connection );
    String reqHost = connection.getRemoteAddress();
    streamInfoList.add( streamInfo );
    //logger.fine("Added device to synchronized group - nDevices = " +
    //                streamInfoList.size() );
  }

  public void stop() {
    running = false;
  }
  
  /**
   *  Run a synchronous shoutcast session. This normally runs in the
   *  context of the HTTP-GET request thread of the 1st requester in the
   *  group.
   *
   *  The run routine returns when end of input from master server
   *  detected, or if *all* clients disconnected
   *
   *  Notes:
   * 
   *  'Master' renderer in the group will normally join the 
   *  session last (because the controller issues the slaves
   *  'play' commands before the master play command). Assuming
   *  that the master is the primary 'listening post', write 
   *  data to it last (don't alter normal write loop order here in 
   *  other words). The normal sound propagation delay of 
   *  the non-masters due to distance from the listener should
   *  be compensated for a little bit by sending data to them
   *  slightly earlier.
   */ 
  public void run()
  {
    running = true;
    
    //
    //  Attach to queue with streaming data.
    //
    ByteBufferQueue queue = parentSession.getQueue();

    int bytes;
    ByteBuffer qBuf;
    byte[] buf = new byte[65536];
    long totalBytes = 0;
    long lastTotal = 0;
    StreamInfo streamInfo = (StreamInfo)streamInfoList.get(0);
    BufferedOutputStream stream = streamInfo.getStream();

    logger.info(" SyncGroup: starting xfers, nClients = " +
                streamInfoList.size() );

    // 
    // Establish 'slow-xfer' window based on bit rate
    // Tuned for Roku at the moment...
    //
    // Roku Notes:
    //
    //    For 192K MP3's (local net), seems like ~64Kbytes 
    //    buffer used before start of playback
    //
    //    WMA lossess PCM files seems like ~200K before start of playback
    //

    int slowXferWindowStart;
    int slowXferWindowEnd;
    int slowXferSleepMillis;

    if( parentSession.getBitRate() < (32*1024) )
    {
      slowXferWindowStart = 0;
      slowXferWindowEnd = 96*1024;
      slowXferSleepMillis = 30;
    }
    else if( parentSession.getBitRate() < (64*1024) )
    {
      slowXferWindowStart = 48*1024;
      slowXferWindowEnd = 144*1024;
      slowXferSleepMillis = 30;
    }
    else
    {
      slowXferWindowStart = 128*1024;
      slowXferWindowEnd = 200*1024;
      slowXferSleepMillis = 20;
    }

    logger.info("media bitRate: " + parentSession.getBitRate() + 
                " slowXferWindow Start,End = " + 
                slowXferWindowStart + "," + slowXferWindowEnd  + 
                " sleep(ms): " + slowXferSleepMillis );
    
    while( running )
    {
      try 
      {
        while( (qBuf = queue.get( 5000 )) == null )
        {
          logger.info("SyncGroup: 5-sec timeout reading queue " );
          if( !running ) 
            return;
        }
        
        // qBuf.limit() contains the number of bytes actually received
        bytes = qBuf.limit();
        qBuf.get( buf, 0, bytes );  // copy to local buf
      }
      catch( IOException e )
      {
        logger.info("SyncGroup: Queue EOF - totalBytes = " + totalBytes );
        running = false;
        break;
      }
      
      totalBytes += bytes;
      if( bytes != HTTPProxySession.QUEUE_PACKET_SIZE )
      {
        logger.warning("Bytes < QUEUE_PACKET_SIZE: " + bytes +
                       " totalBytes: " + totalBytes );
      }
      
      try 
      {
        if( (totalBytes > slowXferWindowStart) && 
            (totalBytes < slowXferWindowEnd) )
        {
          //
          // Slow start algorithm to reduce potential network saturation &
          // retransmits at the point where the buffer is just filling up
          // enough to start playing
          //
        
          // Experimenting with sending data in 1024-byte chunks to improve
          // sync resolution (maybe not necessary)

          int bytesRemaining = bytes;

          for( int m = 0 ; (m < 4) && (bytesRemaining > 0) ; m++ )
          {
            int writeBytes = 1024;
            if( bytesRemaining < 1024 )
              writeBytes = bytesRemaining;

            for( int n = 0 ; n < streamInfoList.size() ; n++ )
            {
              streamInfo = (StreamInfo)streamInfoList.get(n);
              stream = streamInfo.getStream();
              stream.write( buf, m*1024, writeBytes );
            }
            
            // Do flush separately to reduce sequential system call latency
            for( int n = 0 ; n < streamInfoList.size() ; n++ )
            {
              streamInfo = (StreamInfo)streamInfoList.get(n);
              stream = streamInfo.getStream();
              stream.flush();
            }
            
            bytesRemaining -= writeBytes;
            MrUtil.sleep( slowXferSleepMillis );
          }
        }
        else
        {
          // Normal full-speed writes of the data streams
          for( int n = 0 ; n < streamInfoList.size() ; n++ )
          {
            streamInfo = (StreamInfo)streamInfoList.get(n);
            stream = streamInfo.getStream();
            stream.write( buf, 0, bytes );
            stream.flush();
          }
        }
        
        //if( (totalBytes % 8192) == 0 )
        //  System.out.println(" totalBytes = " + totalBytes );

      }
      catch( IOException e )
      {
        logger.info("IOExeption writing to socket (renderer closed connection" );

        // remove the offending (presumably closed) stream
        streamInfoList.remove( streamInfo );
        if( streamInfoList.size() == 0 )
          running = false;
      }

      if( (totalBytes - lastTotal) > 65536*2 )
      {
        //logger.fine("Sync Server: totalBytes = " + totalBytes );
        lastTotal = totalBytes;
      }
              
      //System.out.println("Proxy server - freeing packet");
      queue.free( qBuf );

      if( bytes != HTTPProxySession.QUEUE_PACKET_SIZE )
      {
        running = false;
        logger.fine("Stopping sync group due to trailing odd packet of " +
                    bytes + " bytes" );
      }

    } // while ( running )
    
    //
    // Done - close off the connections
    //
    close();
  }
  
  public void close()
  {
    logger.fine("SyncGroup:close - entered" );

    // Only need to call close for streams other than the 'master' creator
    for( int n = 1 ; n < streamInfoList.size() ; n++ )
    {
      StreamInfo streamInfo;

      streamInfo = (StreamInfo)streamInfoList.get(n);

      HTTPConnection connection = streamInfo.getConnection();
      logger.fine("SyncGroup: closing connection" );
      connection.close();
    }

    logger.fine("SyncGroup:close - leaving" );
  }

}
